162

     # Dag - синтаксис. Основы

Полезный ресурс: https://www.youtube.com/watch?v=G6ipydgZRnE

Содержание

  • Что такое DAG
  • Что такое task
  • Структура DAG-файла
  • Запуск первого DAG-файла в Airflow
  • Подключение Python-функций
  • Использование нескольких DAG-ов в одном файле
  • Обмен сообщений через XCOM
    • Как сохранять данные в XCOM
    • Как читать данные из XCOM
      • Как работает передача данных через XCOM
    • PythonOperator - способы сохранения и чтения результата в XCOM
      • Передача одного значения с использованием return
      • Передача нескольких значений с использованием return
      • Передача одного значения с использованием XCOM_push
      • Передача нескольких значений с использованием XCOM_push
      • Способы доступа к значениям XCOM
    • BashOperator - способы сохранения и чтения результата в XCOM

Что такое DAG

DAG-и (Directed Acyclic Graph) используются в Airflow (платформа, построенная на Python), поэтому и DAG-и - это Python-код.

Airflow - это best practice для процесса ETL:

  • extract data
  • transform data
  • load data

Соответственно обработку данных мы будем проводить через DAG-и. Внутри DAG-а получение, обработка данных идёт через задачи (таски):

        task 2
      /        \
task 1           task 4 - task 5  
      \        /                
        task 3 

В рамках одного дага таски могут иметь абослютно разные зависимости, последовательность выполнения. В рамках каждой задачи выполняется какое-либо специфичное действие, напр.:

task 1 - получение информации из базы данных
task 2 - фильтрация информации по одним критериям и сохранение в каком-либо источнике
task 3 - фильтрация данных по другим критериям
task 4 - сохранение данных по полученным критериям из разных задач
task 5 - сохранение тех же данных, но в другом формате и в другом месте

Тем самым мы получаем выстроенный пайплайн, основанный на DAG-е.

Что такое task

Все промежуточные действия внутри DAG-а реализуются на уровне задач (тасок / tasks).

Работа внутри тасок держится на операторах. Операторы бывают разных типов:

  • airflow.operators.bash - оператор для запуска bash-команд
  • airflow.operators.branch - для ветвления
  • airflow.operators.datetime
  • airflow.operators.empty
  • airflow.operators.email - опреатор для отправки e-mail-а
  • airflow.operators.generic_transfer
  • airflow.operators.latest_only - перезапуск одной последней задачи
  • airflow.operators.python - произвольный python-код
  • airflow.operators.subdag
  • airflow.operators.trigger_dugrun
  • PostgresOperator - оператор для вызова sql-запросов в PostgreSQL БД

Также таски могут представлять из себя сенсоры:

  • PythonSensor - ждём, когда функция вернёт True
  • RedisKeySensor - проверяет, существует ли переданный ключ в Redis хранилище
  • S3Sensor - проверяет наличие объекта по ключу в S3-бакете
  • RedisPubSubSensor - проверяет наличие сообщения в pub-sub очереди

Структура DAG-файла

Импортируем модуль DAG:

from airflow import DAG

Описание DAG-а

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
args = {
    # общие настройки
}

with DAG(
        # общие настройки

) as dag:
    # описание задач
    my_task_1 = ...
    my_task_2 = ...


# Запускаем задачи, указывая последовательность
my_task_1 >> my_task_2

Разбор DAG-файла

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# общие настройки
args = {
    'owner': 'dao2',
    'start_date': datetime(2024, 1, 1),
    'provide_context': True
}

with DAG(

    "First-Dag-Name",
    #dag_id="First-Dag-Name",
    description="Первый даг",
    schedule_interval='*/1 * * * *',
    #start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    default_args=args,
    tags=["dao2"]

) as dag:
    # описание задач
    my_task_1 = ...
    my_task_2 = ...


# Запуск задач
my_task_1 >> my_task_2
  • 'owner': 'dao2',: отображение в UI Airflow, кому принадлежит dag
  • 'start_date': самая ранняя дата анализа дагом, как далеко смотреть "назад в прошлое" при запуске Dag
  • 'provide_context': True: передавать контекст во всех задачах
  • "First-Dag-Name" или dag_id="First-Dag-Name": имя, под которым в UI будет отображаться этот Dag
  • словарь с настройками args мы передаём в наш даг через переменную default_args.
  • schedule_interval='*/1 * * * *': выполнять каждую минуту
  • catchup=False: игнорировать настройку start_date и начинать с текущего момента
  • #start_date=pendulum.datetime(2023, 1, 1, tz="UTC"): закомментирован, так как настройки start_date у нас уже прописаны в args

Создаём первый полноценный работающий DAG:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# общие настройки
args = {
    'owner': 'dao2',
    'start_date': datetime(2024, 1, 1),
    'provide_context': True
}

with DAG(

    #"First-Dag-Name",
    dag_id="First-Dag-Name",
    description="Первый даг",
    schedule_interval='*/1 * * * *',
    catchup=False,
    default_args=args,

) as dag:

    # описание задач
    my_task_1 = BashOperator(
        task_id = 'task_1',
        bash_command = 'echo $(date): запущен таск 1'
    )

    my_task_2 = BashOperator(
        task_id = 'task_2',
        bash_command = 'echo $(date): запущен таск 2'
    )

    my_task_3 = BashOperator(
        task_id = 'task_3',
        bash_command = 'echo $(date): запущен таск 3'
    )

# Запуск задач
my_task_1 >> my_task_2
my_task_1 >> my_task_3
my_task_3 >> my_task_2

Запуск первого DAG-файла в Airflow

Чтобы запустить DAG, сначала требуется настроить окружение. Установить Airflow локально можно несколькими способами:

  • docker
  • docker-compose
  • pip

В статье показана установка через pip.

Подключение Python-функций

Ранее уже был показан пример синтаксиса, но он может отличаться, есть разные варианты написания DAG-а на уровне Python. Один из таких примеров приведён ниже:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

def do_some_job():
    print("some job done")
    print(f"dag details: {read_task}")

def do_another_job():
    print("do_another_job")
    print(f"dag details: {dag}")

default_args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='my_id',
    default_args=default_args,
    schedule_interval='@daily'
)

start_task = DummyOperator(task_id='start_task', dag=dag)

read_task = PythonOperator(
    task_id='read_task',
    provide_context=True,
    python_callable=do_some_job,
    dag=dag
)

write_task = PythonOperator(
    task_id='write_task',
    provide_context=True,
    python_callable=do_another_job,
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> read_task >> write_task >> end_task

Как видно через Python-операторы и переменную python_callable мы передаём функцию, которая объявлена выше в коде. Также можно сначала импортировать функцию из другого файла и обращаться к функции, объявленной в импорте.

В вызываемую через python_callable функции можно обращаться к переменным, указанным в данном даге. А именно:

[2024-01-26, 20:18:01 UTC] {logging_mixin.py:188} INFO - some job done
[2024-01-26, 20:18:01 UTC] {logging_mixin.py:188} INFO - dag details: <Task(PythonOperator): read_task>

[2024-01-26, 20:11:19 UTC] {logging_mixin.py:188} INFO - do_another_job
[2024-01-26, 20:11:19 UTC] {logging_mixin.py:188} INFO - dag details: <DAG: my_id>

Использование нескольких DAG-ов в одном файле

В одном и том же python-файле файле может быть задано более одного DAG-а, а также сколько угодно конфигураций. Именно поэтому мы явно прописываем, какую конфигурацию использовать каждому DAG-у (default_args=), а также какие DAG-и использовать в тасках (dag=dag2).

Код ниже является не самым очевидным, так как названия переменных совпадают для разных дагов:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

def do_some_job():
    print("some job done")
    print(f"dag details: {read_task}")

def do_another_job():
    print("do_another_job")
    print(f"dag details: {dag.task_dict}")

default_args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='my_id',
    default_args=default_args,
    schedule_interval='@daily',
    tags=["dao2"]
)

dag2 = DAG(
    dag_id='id_2',
    default_args=default_args,
    schedule_interval='@daily',
    tags=["dao2"]
)

start_task = DummyOperator(task_id='start_task', dag=dag)

read_task = PythonOperator(
    task_id='read_task',
    provide_context=True,
    python_callable=do_some_job,
    dag=dag
)

write_task = PythonOperator(
    task_id='write_task',
    provide_context=True,
    python_callable=do_another_job,
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)
start_task >> read_task >> write_task >> end_task


start_task = DummyOperator(task_id='start_task', dag=dag2)

read_task = PythonOperator(
    task_id='read_task',
    provide_context=True,
    python_callable=do_some_job,
    dag=dag2
)

write_task = PythonOperator(
    task_id='write_task',
    provide_context=True,
    python_callable=do_another_job,
    dag=dag2
)

end_task = DummyOperator(task_id='end_task', dag=dag2)

start_task >> read_task >> write_task >> end_task

На уровне UI Airflow после импорта данного py-скрипта в UI просто появятся две разные задачи:

Важно: запуская задачи, мы не можем взять одну задачу от одного дага и привязать её к задаче от другого. Задачи могут принадлежать только одному дагу и соответственно совместно использоваться только внутри одного дага.

Обмен сообщений через XCOM

Даг может состоять из большого количества задач, каждая задача имеет своё уникальное название:

read_task = PythonOperator(...
print(f"dag details: {read_task}")
{logging_mixin.py:188} INFO - dag details: <Task(PythonOperator): read_task>

Более того, мы можем просмотреть все "переменные" Dag-а:

print(f"dag details: {dag.task_dict}")
dag details: {
    'start_task': <Task(EmptyOperator): start_task>, 
    'read_task': <Task(PythonOperator): read_task>, 
    'write_task': <Task(PythonOperator): write_task>, 
    'end_task': <Task(EmptyOperator): end_task>}

Как видно на последнем примере, данный даг состоит из четырёх тасок, каждый из которых выполняется самостоятельно (имеет свою логику, свои данные и никак не зависит от другой задачи).

Но бывают случаи, когда нам может понадобиться узнать результат какой-либо другой задачи и выполнять последующие действия в зависимости от этого результата. Для этих целей используется менеджер контекста XCOM.

В таком случае во время выполнения первой задачи мы помещаем полученные данные в Metadata DB Airflow (вызываем XCOM push). После чего мы сможем обращаться к этим данным из других задач.

Чтобы пользоваться данным способом передачи данных требуется разобраться с его синтаксисом:

  • как сохранять данные
  • как доставать данные

Как сохранять данные в XCOM

Сохранение данных будем осуществлять через PythonOperator:

  1. Потребуется создать задачу, в рамках которой будут получены и сохранены данные
  2. Преобразование и сохранение данных делается через Python-фунцию, в которой требуется учитывать некоторые моменты

Создадим python-функцию, котороая возвращает какое-либо значение:

from datetime import datetime, timedelta
def get_time():
    time_is = str(datetime.now())
    print(f"Time: {time_is}")
    return time_is

Создадим задачу, которая вызывает эту функцию:

t1 = PythonOperator(
    task_id='get_time',
    python_callable=get_time
)

Напишем минимальный DAG с этой информацией:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def get_time():
    time_is = str(datetime.now())[:19]
    print(f"Time: {time_is}")
    return time_is

default_args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
}

dag = DAG(
    dag_id='xcom_minimal',
    default_args=default_args,
    schedule_interval='@daily',
    tags=["dao2"]
)

t1 = PythonOperator(
    task_id='get_time',
    python_callable=get_time,
    dag=dag
)

t1

В этом примере мы не пользовались никакими XCOM-ами, но в UI видно, что таска возвращает значение time_is:

Такое поведение объясняется наличием return значения в python-функции. При использовании опреатора PythonOperator совместно с функцией return значение автоматически сохраняется в Metadata DB Airflow.

Как читать данные из XCOM

Создадим вторую функцию, которая будет вычитывать это значение из первой задачи и в зависимости от него выводить результат:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
t2 = PythonOperator(
    task_id='print_result',
    python_callable=print_result,
    provide_context=True,
    dag=dag
)

def print_result(**context):
    print(f"context: {context}")
    tsk_inst = context['ti']
    res = tsk_inst.xcom_pull(task_ids='get_time')
    print(f"res: {res}")

Что здесь важно:

В задаче t2 мы сказали, чтобы в функцию передавался контекст: provide_context=True. А в самой функции добавили обработку именованных аргументов print_result(**context).

Какое слово использовать, не имеет значения. Кто-то пишет kwargs, кто-то context. Важно, чтобы этот аргумент начинался с **любое_название.

Слово context используется не просто так. Даже в задаче параметр, отвечающий за передачу контекста называется provide_context. Поэтому будем использовать именно это слово.

Если вы выполните этот код и взгляните в логах на вывод строки print(f"context: {context}"), то увидите огромную простыню информации:

[2024-01-27T02:52:29.632+0300] {logging_mixin.py:188} INFO - context: 
{'conf': <airflow.configuration.AirflowConfigParser object at 0x7f04b5b63fd0>, 
'dag': <DAG: xcom_minimal>, 
'dag_run': <DagRun xcom_minimal @ 2024-01-26 23:52:24.792984+00:00: manual__2024-01-26T23:52:24.792984+00:00
...

То есть это информация, передавамая в связи с вызовом данной задачи, является контекстом этой задачи (то есть содержит описание всех деталей, относящихся к задаче). Один из перечисленных там параметров - это 'ti':

'task': <Task(PythonOperator): print_result>,
'task_instance': <TaskInstance: xcom_minimal.print_result manual__2024-01-26T23:52:24.792984+00:00 [running]>,
'task_instance_key_str': 'xcom_minimal__print_result__20240126',
'test_mode': False,
'ti': <TaskInstance: xcom_minimal.print_result manual__2024-01-26T23:52:24.792984+00:00 [running]>

На момент исполнения задачи этот параметр содержит:

  • информацию о названии таска: `TaskInstance: xcom_minimal.print_result
  • как и когда он запущен (вручную): manual__2024-01-26
  • его статус: running

Эта информация дана для общего ознакомления, чтобы объяснить, что существует контекст, который представляет из себя словарь и у которого имеется ключ ti.

Как работает передача данных через XCOM

Как и почему мы можем получать данные из других задач через этот ключ? Данный ключ может использоваться и используется XCOM-ом при необходимости передачи информации.

Чтобы подключиться к XCOM-у, нам нужен контекст и мы активируем его передачу внутри таска:

1
2
t2 = PythonOperator(
    provide_context=True

Контекст - это словарь, чтобы работать с ним в функции, включаем приём именованных аргументов:

def print_result(**context):

Чтобы получить возможность взаимодейстовавать с менеджером XCOM (неважно для чтения или записи), создаём переменную:

tsk_inst = context['ti']

После чего мы обращаемся к XCOM и вычитываем необходимое значение по id-таска, в рамках которого оно было туда сохранено.

res = tsk_inst.xcom_pull(task_ids='get_time')

Другими словами: мы лезем в Airflow DB и по названию задачи достаём оттуда значение, сохраненное после выполнения задачи.

Рассмотрим финальный пример:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def get_time():
    time_is = str(datetime.now())[:19]
    print(f"Time: {time_is}")
    return time_is

def print_result(**context):
    print(f"context: {context}")
    tsk_inst = context['ti']
    res = tsk_inst.xcom_pull(task_ids='get_time')
    print(f"res: {res}")
    if len(res) == 19:
        print("Корректная длина данных")
        return "Ok"
    else:
        print("С данными что-то не так")
        return "Error"

def get_ti_context_all(**context):
    tsk_inst = context['ti']
    print(f"xcom_pull 'get_time': {tsk_inst.xcom_pull(task_ids='get_time')}")
    print(f"xcom_pull 'print_result': {tsk_inst.xcom_pull(task_ids='print_result')}")

default_args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
}

dag = DAG(
    dag_id='xcom_minimal',
    default_args=default_args,
    schedule_interval='@daily',
    tags=["dao2"]
)

t1 = PythonOperator(
    task_id='get_time',
    python_callable=get_time,
    provide_context=True,
    dag=dag
)

t2 = PythonOperator(
    task_id='print_result',
    python_callable=print_result,
    provide_context=True,
    dag=dag
)

t3 = PythonOperator(
    task_id='get_ti_context_all',
    python_callable=get_ti_context_all,
    provide_context=True,
    dag=dag
)

t1 >> t2 >> t3
  • Мы создали три задачи: t1, t2, t3
  • Мы запустили их последовательно: t1 >> t2 >> t3
  • В первой задаче (а точнее в функции get_time) за счёт использования return мы сохранили результат в Metadata базе данных)
  • Во второй задаче мы проверили результат и сохранили его в XCOM за счёт использования return
  • В третьем таске просто дополнительно распечатали результат обеих задач
#task 1
INFO - Time: 2024-01-27 03:48:13

#task 2
INFO - res: 2024-01-27 03:48:13
INFO - Корректная длина данных

#task 3
INFO - xcom_pull 'get_time': 2024-01-27 03:48:13
INFO - xcom_pull 'print_result': Ok

PythonOperator - способы сохранения и чтения результата в XCOM

Передача одного значения с использованием return

Один из способов - с использованием функции return - был уже показан выше:

  • создаётся task типа PythonOperator, который вызывает python-функцию
  • сама функция возвращает значение через return
from datetime import datetime, timedelta
def get_time():
    time_is = str(datetime.now())
    print(f"Time: {time_is}")
    return time_is

t1 = PythonOperator(
    task_id='get_time',
    python_callable=get_time
)

Чтобы прочитать это значение:

  • мы создаём второй task, из-под которого также вызываем Python-функцию
  • в таске обязательно передаём контекст в вызываемую функцию
  • внутри функции вычитываем значение по имени того таска, внутри которого сохранялось это значение
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
t2 = PythonOperator(
    task_id='print_result',
    python_callable=print_result,
    provide_context=True,
    dag=dag
)

def print_result(**context):
    print(f"context: {context}")
    tsk_inst = context['ti']
    res = tsk_inst.xcom_pull(task_ids='get_time')
    print(f"res: {res}")

Передача нескольких значений с использованием return

Если требуется передать несколько значений, то ничто не мешает вернуть значение, напр., в виде списка, словаря или какого-либо другого типа данных.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from airflow.operators.python import PythonOperator
from airflow import DAG

from datetime import datetime, timedelta

args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
}

with DAG (
    dag_id="return_many",
    default_args=args,
    schedule_interval='1 */1 * * *',
    catchup=False,
    tags=["dao2", "return", "many"],
)  as dag:

    def return_many():
        value_3 = "значение 3"
        return ["значение 1", "значение 2", value_3]

    def print_result(**context):
        tsk_inst = context['ti']
        result = tsk_inst.xcom_pull(task_ids='get_time')
        print(f"result: {result}")
        print(f"2-й элемент списка: {result[1]}")


    t1 = PythonOperator(
        task_id='return_many',
        python_callable=return_many
    )

    t2 = PythonOperator(
        task_id='print_result',
        python_callable=print_result,
        provide_context=True,
        dag=dag
    )

t1 >> t2

После чего можно вычитать значение, используя синтаксис используемого типа данных:

result: ['значение 1', 'значение 2', 'значение 3']
2-й элемент списка: значение 2

Передача одного значения с использованием XCOM_push

При передаче значения через xcom_push необходимо помнить, что и в функцию на запись и в функцию на чтение требуется передавать контекст.

from airflow.operators.python import PythonOperator
from airflow import DAG

from datetime import datetime, timedelta

args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
}

with DAG (
    dag_id="return_py_xcom",
    default_args=args,
    schedule_interval='1 */1 * * *',
    catchup=False,
    tags=["dao2", "return", "py", "xcom"],
)  as dag:

    def save_value_over_xcom(**context):
        tsk_inst = context['ti']
        tsk_inst.xcom_push(key="save_value_over_xcom", value="Сохранено через XCOM")

    def print_result(**context):
        tsk_inst = context['ti']
        result = tsk_inst.xcom_pull(key="save_value_over_xcom", task_ids='save_value_over_xcom')
        print(f"result: {result}")


    t1 = PythonOperator(
        task_id='save_value_over_xcom',
        python_callable=save_value_over_xcom,
        provide_context=True,
    )

    t2 = PythonOperator(
        task_id='print_result',
        python_callable=print_result,
        provide_context=True,
        dag=dag
    )

t1 >> t2

Также - при вычитывании данных надо обязательно указывать не только задачу, из которой вычитывается значение, но и ключ, в который значение было записано:

1
2
3
4
5
# сохранение
tsk_inst.xcom_push(key="любой_уникальный_ключ", value="Сохранено через XCOM")

# чтение
result = tsk_inst.xcom_pull(key="любой_уникальный_ключ", task_ids='save_value_over_xcom')

Передача нескольких значений с использованием XCOM_push

Первый способ - это передавать значение в виде списка или любого другого типа данных. Описывалось в главе "Передача нескольких значений с использованием return".

Также можно записывать требуемые значения в отдельные xcom-значения:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from airflow.operators.python import PythonOperator
from airflow import DAG

from datetime import datetime, timedelta

args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
}

with DAG (
    dag_id="return_py_xcom",
    default_args=args,
    schedule_interval='1 */1 * * *',
    catchup=False,
    tags=["dao2", "return", "py", "xcom"],
)  as dag:

    def save_value_over_xcom(**context):
        tsk_inst = context['ti']
        tsk_inst.xcom_push(key="значение_1", value="Сохранено через XCOM 1")
        tsk_inst.xcom_push(key="значение_2", value="Сохранено через XCOM 2")
        tsk_inst.xcom_push(key="значение_3", value="Сохранено через XCOM 3")

    def print_result(**context):
        tsk_inst = context['ti']
        value_1 = tsk_inst.xcom_pull(key="значение_1", task_ids='save_value_over_xcom')
        value_2 = tsk_inst.xcom_pull(key="значение_2", task_ids='save_value_over_xcom')
        value_3 = tsk_inst.xcom_pull(key="значение_3", task_ids='save_value_over_xcom')
        print(f"result: {value_3}")


    t1 = PythonOperator(
        task_id='save_value_over_xcom',
        python_callable=save_value_over_xcom,
        provide_context=True,
    )

    t2 = PythonOperator(
        task_id='print_result',
        python_callable=print_result,
        provide_context=True,
        dag=dag
    )

t1 >> t2

Способы доступа к значениям XCOM

Ранее для чтения значений внутри задачи t2 всегда создавался отдельная python-функция, осуществляющая чтение. Если же требуется получить значение внутри задачи, напр., то к нему можно обращаться через синтаксис:

"{{ ti.xcom_pull(key='last_card_table', task_ids='get_last_tables') }}", 

Вариант применения будет продемонстрирован далее, при использовании BashOperator-а.

BashOperator - способы сохранения и чтения результата в XCOM

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from airflow.operators.bash import BashOperator
from airflow import DAG

from datetime import datetime, timedelta

args = {
    'owner': 'dao2',
    'start_date': datetime.now() - timedelta(days=1),
}

with DAG (
    dag_id="return_bash_xcom",
    default_args=args,
    schedule_interval='1 */1 * * *',
    catchup=False,
    tags=["dao2", "return", "bash", "xcom"],
)  as dag:        

    t1 = BashOperator(
        task_id="task_1",
        bash_command='echo {{ task_instance.xcom_push(key="time_now", value="$(date)") }}',
    )

    t2 = BashOperator(
        task_id="task_2",
        bash_command='echo {{ task_instance.xcom_pull(key="time_now", task_ids="task_1") }}',
    )

t1 >> t2

В отличии от предыдущих примеров сохранение и обращение к XCOM-значениям тут происходит:

  • внутри задач
  • с использованием особого синтаксиса

Данный синтаксис можно использовать и в других операторах.